/**
 * Copyright (C) @2014 Webank Group Holding Limited
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package cn.webank.framework.message.subscriber;

import java.util.List;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.ReloadableResourceBundleMessageSource;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

import cn.webank.framework.biz.service.support.WeBankServiceDispatcher;
import cn.webank.framework.biz.utils.ServiceUtil;
import cn.webank.framework.dto.BizErrors;
import cn.webank.framework.message.integration.RMBSao;
import cn.webank.rmb.api.PipeSubscriber;
import cn.webank.rmb.destination.Destination;
import cn.webank.rmb.message.Message;

/**
 * 标准队列消息监听类,是主动到rmb拉消息的模式
 * 
 * @author jonyang
 *
 */
public class PullMessageSubscriber extends PipeSubscriber {

	private static final long serialVersionUID = -3914796585877672583L;
	// private static final JsonMapper jsonMapper =
	// JsonMapper.nonDefaultMapper();

	private final static Logger LOG = LoggerFactory.getLogger(PullMessageSubscriber.class);
	/**
	 * 任务执行队列
	 */
	private ThreadPoolTaskExecutor taskExecutor;

	/**
	 * 服务分发类
	 */
	private WeBankServiceDispatcher serviceDispatcher;

	/**
	 * 国际化消息管理资源类
	 */
	private ReloadableResourceBundleMessageSource bundleMessageSource;

	/**
	 * RMB服务类
	 */
	private RMBSao rmbSao;

	/**
	 * 服务方系统ID4位
	 */
	private String sysId;

	/**
	 * 服务所在的dcn号
	 */
	private String dcnNo;

	/**
	 * 空消息列表次数计数器
	 */
	private AtomicInteger emptyMessageListCount = new AtomicInteger(0);

	// /**
	// * 消息生命周期时间,建议设置为 （平均超时-平均交易时间）,单位:秒&lt;0 表示不考虑
	// *
	// */
	// private long messageLiveTimeMillis = -1;

	// public MessageSubscriber(String serviceSysId, String dcnNo,
	// List<Destination> destinations,
	// ThreadPoolTaskExecutor taskExecutor, int timeoutSeconds,
	// WeBankServiceDispatcher serviceDispatcher, RMBSao rmbSao,
	// ReloadableResourceBundleMessageSource bundleMessageSource,
	// long messageLiveTimeMillis) {
	// this(serviceSysId, dcnNo, destinations, taskExecutor, timeoutSeconds,
	// serviceDispatcher, rmbSao, bundleMessageSource);
	// //this.messageLiveTimeMillis = messageLiveTimeMillis;
	// }

	/**
	 * 
	 * @param sysId
	 *            本系统ID
	 * @param dcnNo
	 *            dcn号
	 * @param destinations
	 *            rmb目标列表
	 * @param taskExecutor
	 *            执行线程池
	 * @param timeoutSeconds
	 *            监听rmb超时时间
	 * @param serviceDispatcher
	 *            服务分发类
	 * @param rmbSao
	 *            rmbSao对象
	 * @param bundleMessageSource
	 *            消息源
	 */
	public PullMessageSubscriber(String sysId, String dcnNo, List<Destination> destinations,
			ThreadPoolTaskExecutor taskExecutor, int timeoutSeconds, WeBankServiceDispatcher serviceDispatcher,
			RMBSao rmbSao, ReloadableResourceBundleMessageSource bundleMessageSource) {

		// validate parameter
		Assert.isTrue(!CollectionUtils.isEmpty(destinations), "listener topics is null");
		Assert.notNull(taskExecutor, "listener taskExecutor is null");
		Assert.notNull(serviceDispatcher, "listener serviceDispatcher is null");
		Assert.notNull(bundleMessageSource, "listener ResourceBundleMessageSource is null");
		Assert.notNull(sysId, "listener serviceSysId is null");

		// init
		this.sysId = sysId;
		this.setDestinations(destinations);
		this.setTimeout(timeoutSeconds * 1000);
		this.taskExecutor = taskExecutor;
		this.serviceDispatcher = serviceDispatcher;
		this.bundleMessageSource = bundleMessageSource;
		this.dcnNo = dcnNo;
		this.rmbSao = rmbSao;
	}

	// /**
	// * @return the serviceSysId
	// */
	// public String getServiceSysId() {
	// return serviceSysId;
	// }
	//
	// /**
	// * @param serviceSysId
	// * the serviceSysId to set
	// */
	// public void setServiceSysId(String serviceSysId) {
	// this.serviceSysId = serviceSysId;
	// }

	/**
	 * @return the sysId
	 */
	public String getSysId() {
		return sysId;
	}

	/**
	 * @param sysId
	 *            the sysId to set
	 */
	public void setSysId(String sysId) {
		this.sysId = sysId;
	}

	/**
	 * 处理批量消息的方法，由具体应用来实现
	 * 
	 * @param messages
	 *            消息列表
	 */
	@Override
	public void handle(List<Message> messages) {
		StringBuilder bizSeqNos = null;

		try {
			this.setOpen(false);// 暂时不再需要fetch message
			if (messages == null || messages.size() == 0) {
				Thread.sleep((emptyMessageListCount.get() >= 25 ? emptyMessageListCount.getAndSet(0)
						: emptyMessageListCount.addAndGet(1)) * 2);
				LOG.debug("message is null");
			} else {
				emptyMessageListCount.set(0);

				bizSeqNos = new StringBuilder();
				for (Message m : messages) {
					bizSeqNos.append(m.getSysHeader().getBizSeqNo() + "\n");
					// if ((this.messageLiveTimeMillis > 0)
					// && (m.getCreateTime().getTime() +
					// this.messageLiveTimeMillis) <= System
					// .currentTimeMillis()) {
					// continue; // ignore this message,invalid message
					// }

					BizErrors errors = new BizErrors();
					Runnable r = ServiceUtil.constructExcutorTask(m, dcnNo, serviceDispatcher, rmbSao,
							bundleMessageSource, this.sysId, errors, Locale.CHINESE);
					taskExecutor.submit(r);

				}
			}

		} catch (Exception e) {
			LOG.error("system error,bizSeqNos:" + (bizSeqNos != null ? bizSeqNos.toString() : ""), e);
		} finally {
			int remainingCapacity = this.taskExecutor.getThreadPoolExecutor().getQueue().remainingCapacity();
			this.setHandleMax(remainingCapacity);// fetch message size
			this.setOpen(true);// 开启fetch message
		}

		LOG.debug("fetch batch message end:" + (messages != null ? messages.hashCode() : ""));
	}

}
